-
Notifications
You must be signed in to change notification settings - Fork 750
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[GOBBLIN-2173] Avoid Adhoc flow spec addition for non leasable entity #4076
Conversation
@@ -17,13 +17,16 @@ | |||
|
|||
package org.apache.gobblin.service.modules.orchestration; | |||
|
|||
import com.linkedin.restli.common.HttpStatus; | |||
import com.linkedin.restli.server.RestLiServiceException; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please fix import ordering, see from : https://gobblin.apache.org/docs/developer-guide/CodingStyle/
this.specCompiler.onAddSpec(flowSpec); | ||
} | ||
|
||
private void processFlowSpecForAdhocFlows(FlowSpec flowSpec) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this function is validating whether or not the flow should be created based on whether it's a duplicate adhoc flow within a short span, can this be named differently to convey it's purpose like validateConcurrentAdhocFlowCreation ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated the name appropriately.
String flowGroup = "testGroup"; | ||
DagActionStore.DagAction dagAction = new DagActionStore.DagAction(flowName, flowGroup, System.currentTimeMillis(), "testJob", DagActionStore.DagActionType.LAUNCH); | ||
DagActionStore.LeaseParams leaseParams = new DagActionStore.LeaseParams(dagAction); | ||
leaseArbiter.existsLeasableEntity(leaseParams); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No assertions? Also in this unit test we are mocking leaseArbiter, so I don't think it adds any value on testing the functionality of leaseArbiter
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for pointing out missed adding the Assertion for this test, purpose of this test is to verify that MySQLStatemanagementStore returns the value returned by leaseArbiter, also updated this test.
ffa19e1
to
483e2b6
Compare
483e2b6
to
618f357
Compare
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/LeaseUnavailableException.java
Outdated
Show resolved
Hide resolved
@@ -133,6 +139,25 @@ public AddSpecResponse onAddSpec(Spec addedSpec) { | |||
return new AddSpecResponse<>(null); | |||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just an FYI, this also gets called during updating a flow. But since we have a condition of checking the flow is scheduled or not and we don't expect users to update an adhoc flow, we should be fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But since this can still be called for adhoc flows, it would be good to test what the behaviour is. No need to handle it specially, but to know what the behaviour is would be good.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the callout, will add it for the test suite
* @param leaseParams uniquely identifies the flow, the present action upon it, the time the action | ||
* was triggered, and if the dag action event we're checking on is a reminder event | ||
*/ | ||
boolean canAcquireLeaseOnEntity(DagActionStore.LeaseParams leaseParams) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
consider renaming this to isLeaseAcquirable
for conciseness and to be consistent with other method names
.../src/main/java/org/apache/gobblin/service/modules/orchestration/MultiActiveLeaseArbiter.java
Show resolved
Hide resolved
@@ -362,6 +362,12 @@ else if (leaseValidityStatus == 2) { | |||
} | |||
} | |||
|
|||
@Override | |||
public boolean canAcquireLeaseOnEntity(DagActionStore.LeaseParams leaseParams) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add javadoc.. something like:
Determines if a lease can be acquired for the given flow. A lease is acquirable if ...
DagActionStore.LeaseParams leaseParams = new DagActionStore.LeaseParams(dagAction, System.currentTimeMillis()); | ||
try { | ||
if (!dagManagementStateStore.canAcquireLeaseOnEntity(leaseParams)) { | ||
throw new LeaseUnavailableException("Lease already occupied by another execution of this flow"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add an info log here with flowGroup
, flowName
.. it would be useful in debugging
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added lease params which contains details of flow name and flow group
@@ -133,6 +139,25 @@ public AddSpecResponse onAddSpec(Spec addedSpec) { | |||
return new AddSpecResponse<>(null); | |||
} | |||
|
|||
private void validateAdhocFlowLeasability(FlowSpec flowSpec) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add javadoc
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated
} | ||
|
||
@Test | ||
public void testOnAddSpec_withFlowSpec_Available() throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use camelcase for naming methods
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
ConfigBuilder configBuilder = ConfigBuilder.create() | ||
.addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "testGroup") | ||
.addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "testName") | ||
.addPrimitive(ConfigurationKeys.JOB_SCHEDULE_KEY, "0 1/0 * ? * *") | ||
.addPrimitive("gobblin.flow.sourceIdentifier", "source") | ||
.addPrimitive("gobblin.flow.destinationIdentifier", "destination"); | ||
Config config = configBuilder.build(); | ||
FlowSpec flowSpec = FlowSpec.builder().withConfig(config).build(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we use this.flowSpec
here since it already has the schedule?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need different configs in flowspec depending on the case we are testing and hence would prefer to define new ones.
and throw LeaseUnavailableException | ||
*/ | ||
@Test(expectedExceptions = LeaseUnavailableException.class) | ||
public void testOnAddSpec_withFlowSpec_leaseUnavailable() throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's also test the scenario when canAcquireLeaseOnEntity
returns true
for adhoc flow since the other test is for scheduled flow
2ca43c7
to
30929f8
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good impl and nice test coverage!
my suggestions are around naming and the importance of encapsulating impl details strictly within their own (lower) levels - don't let them leak upwards through each successive calling layer!
...er/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigsV2ResourceHandler.java
Outdated
Show resolved
Hide resolved
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/LeaseUnavailableException.java
Outdated
Show resolved
Hide resolved
* An {@link RuntimeException} thrown when lease cannot be acquired on provided entity. | ||
*/ | ||
public class LeaseUnavailableException extends RuntimeException { | ||
public LeaseUnavailableException(String message) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
beyond clearly naming for callers, impl-wise, this definitely relates to a flow, so that should be a ctor param. consider whether to allow a catcher to reach in to access the details as instance member(s) or merely to use internally in the ctor, to contextualize the message
passed along to super
.
.../src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java
Outdated
Show resolved
Hide resolved
@@ -61,6 +61,17 @@ public interface MultiActiveLeaseArbiter { | |||
LeaseAttemptStatus tryAcquireLease(DagActionStore.LeaseParams leaseParams, boolean adoptConsensusFlowExecutionId) | |||
throws IOException; | |||
|
|||
/** | |||
* This method checks if lease can be acquired on provided flow in lease params | |||
* returns true if entry for the same flow does not exists within epsilon time |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
very reasonable method-level javadoc... but it turns out epsilon
is not mentioned anywhere in class-level javadoc, so this method description lacks context.
so, please add the class-level info. mentioning the name 'epsilon' is fine, but definitely also give it a more specific name, like "Lease Consolidation Period".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated
Assert.assertTrue(firstLaunchStatus instanceof LeaseAttemptStatus.LeaseObtainedStatus); | ||
completeLeaseHelper(launchLeaseParams3); | ||
Thread.sleep(LESS_THAN_EPSILON); | ||
Assert.assertFalse(mysqlMultiActiveLeaseArbiter.isLeaseAcquirable(launchLeaseParams3)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the whole idea is that a "similar" (but NOT same) lease isn't itself already within epsilon. hence, be sure to test LeaseParams
that were NOT just given to tryAcquireLease
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
created new launch param launchLeaseParams3_similar
.../java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java
Outdated
Show resolved
Hide resolved
...service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
Outdated
Show resolved
Hide resolved
...service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
Show resolved
Hide resolved
...service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
Outdated
Show resolved
Hide resolved
@@ -256,7 +257,10 @@ public CreateKVResponse<ComplexResourceKey<FlowId, FlowStatusId>, FlowConfig> cr | |||
responseMap = this.flowCatalog.put(flowSpec, true); | |||
} catch (QuotaExceededException e) { | |||
throw new RestLiServiceException(HttpStatus.S_503_SERVICE_UNAVAILABLE, e.getMessage()); | |||
} catch (Throwable e) { | |||
} catch(LeaseUnavailableException e){ | |||
throw new RestLiServiceException(HttpStatus.S_409_CONFLICT, e.getMessage()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
usually exception messages are designed for logging, more than for end-user consumption, so probably not appropriate to blindly return that. (it's sometimes done for a 5xx error, as above... but even that can be inadvisable.)
anyway, the 409 above might offer a better template:
return new CreateKVResponse<>(new RestLiServiceException(HttpStatus.S_409_CONFLICT,
"FlowSpec with URI " + flowSpec.getUri() + " was launched less than N secs ago, no action will be taken"));
(to provide N we may wish to tunnel the value of epsilon... or at least how many secs remain before a subsequent launch would be possible)
also: when do we want to return
(as that 409 above does), vs. throw
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated as discussed offline
dbbf10b
to
0981b82
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
very close!
...er/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigsV2ResourceHandler.java
Outdated
Show resolved
Hide resolved
...in-runtime/src/main/java/org/apache/gobblin/runtime/api/TooSoonToRerunSameFlowException.java
Outdated
Show resolved
Hide resolved
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
Outdated
Show resolved
Hide resolved
* Returns true if lease can be acquired on entity provided in leaseParams. | ||
* @param leaseParams uniquely identifies the flow, the present action upon it, the time the action was triggered, | ||
* and if the dag action event we're checking on is a reminder event | ||
* Check if an action exists in dagAction store by flow group, flow name, flow execution id, and job name. | ||
* @param flowGroup flow group for the dag action | ||
* @param flowName flow name for the dag action | ||
* @param flowExecutionId flow execution for the dag action |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
javadoc seems out-of-date, esp. mentioning LeaseParams and DagAction
also, out-of-date:
Returns true if lease can be acquired on entity provided in leaseParams.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated javadoc
@@ -63,13 +63,13 @@ LeaseAttemptStatus tryAcquireLease(DagActionStore.LeaseParams leaseParams, boole | |||
|
|||
/** | |||
* This method checks if lease can be acquired on provided flow in lease params | |||
* returns true if entry for the same flow does not exists within epsilon time | |||
* returns true if entry for the same flow does not exists within Lease Consolidation Period |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sense is reversed here...
maybe:
Check whether the same flowGroup+flowName is within the Lease Consolidation Period (aka. epsilon) from other, unrelated leasing activity
this is also out-of-date:
@return true if lease can be acquired on the flow passed in the lease params, false otherwise
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated javadoc
if (!dagManagementStateStore.isLeaseAcquirable(leaseParams)) { | ||
throw new LeaseUnavailableException("Lease already occupied by another execution of this flow"); | ||
if (dagManagementStateStore.existsCurrentlyLaunchingSimilarFlow(flowGroup, flowName, FlowUtils.getOrCreateFlowExecutionId(flowSpec))) { | ||
throw new TooSoonToRerunSameFlowException("Lease already occupied by another execution of this flow", flowSpec); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
-
we have an
.info
line above announcing the check, so let's follow w/ a.warn
line here when the check fails. suggest: "another recent adhoc flow exec found for...." -
exception msg could benefit from minor improvements, yet--however it's phrased--it belongs encapsulated in the
TooSoonToRerun...
ctor, which should take solely aFlowSpec
param
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated, Thanks for the suggestion
.../java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStoreTest.java
Outdated
Show resolved
Hide resolved
private static final long flowExecutionId = 12345677L; | ||
private static final long flowExecutionId1 = 12345996L; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I never noticed before that flowExecutionId
, which is customarily millis-since-epoch only has 7 digits when it should have 10. let's fix that and also define this as:
private static final long flowExecutionIdAlt = flowExecutionId + ...; // whatever you consider a reasonable (later) offset
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated the same variables to store millis
.../java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java
Outdated
Show resolved
Hide resolved
...service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
Outdated
Show resolved
Hide resolved
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## master #4076 +/- ##
============================================
- Coverage 51.47% 42.03% -9.45%
+ Complexity 7546 2334 -5212
============================================
Files 1386 496 -890
Lines 52105 20961 -31144
Branches 5727 2433 -3294
============================================
- Hits 26821 8810 -18011
+ Misses 22990 11238 -11752
+ Partials 2294 913 -1381 ☔ View full report in Codecov by Sentry. 🚨 Try these New Features:
|
if (error instanceof TooSoonToRerunSameFlowException) { | ||
throw (TooSoonToRerunSameFlowException) error; | ||
if (error instanceof RuntimeException && error.getCause() instanceof TooSoonToRerunSameFlowException) { | ||
throw (TooSoonToRerunSameFlowException) error.getCause(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the cast isn't necessary (the reason I suggested wrapping TooSoonToR...
was to enable uniform, type-agnostic code here.)
for a method with the signature throws Throwable
, aren't these two equivalent?
throw error.getCause();
throw (T) error.getCause();
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
makes sense updated
@Test(expectedExceptions = TooSoonToRerunSameFlowException.class) | ||
@Test(expectedExceptions = RuntimeException.class) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
after signing off, I realized my literal advice would compromise clarity, foul up tests, etc. (in this very way)
sorry for that half-baked advice... try this instead:
public static class TooSoonToRerunSameFlowException extends RuntimeException {
@Getter private final FlowSpec flowSpec;
/**
* Account for unwrapping within @{link FlowCatalog#updateOrAddSpecHelper}`s `CallbackResult` error handling for `SpecCatalogListener`s
* @return `TooSoonToRerunSameFlowException` wrapped in another `TooSoonToRerunSameFlowException
*/
public static TooSoonToRerunSameFlowException wrappedOnce(FlowSpec flowSpec) {
return new TooSoonToRerunSameFlowException(flowSpec, new TooSoonToRerunSameFlowException(flowSpec));
}
public TooSoonToRerunSameFlowException(FlowSpec flowSpec) {
this(flowSpec, null);
}
/** restricted-access ctor: use {@link #wrappedOnce(String)} instead */
private TooSoonToRerunSameFlowException(FlowSpec flowSpec, Throwable cause) {
super("Lease already occupied by another recent execution of this flow: " + flowSpec, cause);
this.flowSpec = flowSpec;
}
}
then replace:
throw new RuntimeException(new TooSoonToRerunSameFlowException(flowSpec));
with
throw TooSoonToRerunSameFlowException.wrappedOnce(flowSpec);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure updated as per suggestion
/* | ||
Determines if a lease can be acquired for the given flow. A lease is acquirable if | ||
no existing lease record exists in arbiter table or the record is older then epsilon time | ||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably no need for this comment here in the impl, but if you want one, bring it into line w/ the orig from the interface
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
makes sense, since javadoc already there for the interface, removed comment from here
@@ -24,6 +24,7 @@ | |||
import java.util.Properties; | |||
import java.util.concurrent.TimeUnit; | |||
|
|||
import org.apache.gobblin.runtime.api.TooSoonToRerunSameFlowException; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this import belongs a few lines down w/ other apache gobblin pkgs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated
@@ -133,6 +137,29 @@ public AddSpecResponse onAddSpec(Spec addedSpec) { | |||
return new AddSpecResponse<>(null); | |||
} | |||
|
|||
/* | |||
enforces that a similar adhoc flow is not launching, | |||
else throw TooSoonToRerunSameFlowException |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: {@link TooSoonToRerunSameFlowException}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure updated
3bdead6
to
01cac22
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
excellent improvement with great impl and tests - very nice work!
lease consolidation time, then we do not execute this flow, hence do not process and store the spec | ||
and throw RuntimeException | ||
*/ | ||
@Test(expectedExceptions = RuntimeException.class) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
forgot to change this back to TooSoonToRerun...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
my bad, updated
|
||
|
||
/** | ||
* An {@link RuntimeException} thrown when lease cannot be acquired on provided entity. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
needs update
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated javadoc
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
excellent!
Dear Gobblin maintainers,
Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!
JIRA
Description
https://issues.apache.org/jira/browse/GOBBLIN-2173
Before launching the flows we store adhoc flows temporarily in our flowspec DB. In case of concurrent execution of the flows within the epsilon time defined the first execution is launched successfully, but for the second execution flow spec entry is created but launching is not successful since lease cannot be acquired, for this scenario we should prevent creation of flowspec entry in the spec store, this PR contains code change to address this.
Tests
My PR adds the following unit tests OR does not need testing for this extremely good reason:
Tests added to ensure onAddSpec in Orchestrator is successful only if lease can be acquired on entity(call to function leasableEntryExists), if it does not then throw LeaseNotAvailable Exception
Tests in MysqlMultiActiveLeaseArbiter to ensure existsLeasableEntry returns false, if tried to call this function within epsilon time of acquiring the lease else return true.
existing tests pass
Commits